package com.meta.api.flink;

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import com.meta.api.flink.aggregate.BooleanSummationFunctions;
import com.meta.api.flink.aggregate.BooleanTrendContinueFunctions;
import com.meta.api.flink.aggregate.BooleanTrendSummationFunctions;
import com.meta.api.flink.columes.AmplitudeColumn;
import com.meta.api.flink.columes.MaxColumn;
import com.meta.api.flink.columes.MinColumn;
import com.meta.api.flink.columes.UnixMillisColumn;
import java.util.Map;

//环境工具方法
public class EnvironmentTools {

    //其他环境配置
    public static void environmentConf(StreamExecutionEnvironment env, int parallelism) {
        //重启配置(20分钟内尝试重启5次)
        env.setRestartStrategy(RestartStrategies.failureRateRestart(
                //max-failures-per-interval
                5,
                //failure-rate-interval
                Time.minutes(20),
                //delay
                Time.seconds(30)
        ));
        //默认并行度
        env.setParallelism(parallelism);
        //设置时间语义
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.getConfig().setLatencyTrackingInterval(1000L);
    }

    //checkpoint配置
    public static void checkPointingConf(boolean local, StreamExecutionEnvironment env, Map<String, String> flinkConf) {
        //本地跳过检查点配置
        if (local) {
            return;
        }
        //配置检查点
        String type = flinkConf.get("flink.checkpoint.state.backend.type");
        if (type.equals("none")) {
            return;
        }
        //设置
        env.enableCheckpointing(Long.parseLong(flinkConf.get("flink.checkpoint.enable.time")));
        //ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION：取消作业时保留检查点。请注意，在这种情况下，取消后必须手动清理检查点状态
        //ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION：取消作业时删除检查点。仅当作业失败时，检查点状态才可用
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(Long.parseLong(flinkConf.get("flink.checkpoint.min.pause.between.checkpoints")));
        env.getCheckpointConfig().setCheckpointTimeout(Long.parseLong(flinkConf.get("flink.checkpoint.checkpoint.timeout")));
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(Integer.parseInt(flinkConf.get("flink.checkpoint.max.concurrent.checkpoints")));
        //设置后端地址
        StateBackend stateBackend;
        switch (type) {
            case "memory":
                //内存
                stateBackend = new MemoryStateBackend();
                break;
            case "fs":
                //hdfs
                stateBackend = new FsStateBackend(flinkConf.get("flink.checkpoint.hdfs.checkpoint.data.uri"));
                break;
            case "rocksdb":
                //rocksdb
                stateBackend = new MemoryStateBackend();
                break;
            default:
                //默认内存,数据不能超过5m
                stateBackend = new MemoryStateBackend();
                break;
        }
        env.setStateBackend(stateBackend);
    }

    //注册自定义函数
    public static void registerFunction(StreamTableEnvironment tableEnv) {
        //单行函数
        tableEnv.registerFunction(AmplitudeColumn.AMPLITUDECOLUMN, new AmplitudeColumn());
        tableEnv.registerFunction(MaxColumn.MAXCOLUMN, new MaxColumn());
        tableEnv.registerFunction(MinColumn.MINCOLUMN, new MinColumn());
        tableEnv.registerFunction(UnixMillisColumn.UNIX_MILLIS, new UnixMillisColumn());
        //多行函数
        tableEnv.registerFunction(BooleanSummationFunctions.BOOLSUM, new BooleanSummationFunctions.BooleanSummationAvg());
        tableEnv.registerFunction(BooleanTrendSummationFunctions.BOOLTRENDSUM, new BooleanTrendSummationFunctions.BooleanTrendSummationAvg());
        tableEnv.registerFunction(BooleanTrendContinueFunctions.BOOLTRENDCONTINUE, new BooleanTrendContinueFunctions.BooleanTrendContinueAvg());
    }

    //设置任务并行度
    public static void setParallelism(FlinkTaskConfEntity flinkTaskConfEntity) {
        if (flinkTaskConfEntity.getFlinkConf() == null) {
        }
        if (flinkTaskConfEntity.getRuleList() == null) {
        }
        int parallelism = Integer.parseInt(flinkTaskConfEntity.getFlinkConf().get("flink.sink.parallelism"));
        if (flinkTaskConfEntity.getRuleList().size() == 1) {
            int complexityNum = Integer.parseInt(flinkTaskConfEntity.getRuleList().get(0).get("complexity_num"));
        }
        flinkTaskConfEntity.setParallelism(parallelism);
    }

}
